# -*- coding: utf-8 -*-
# pylint: disable=too-many-branches
"""The dashscope formatter module."""

import json
import os.path
from typing import Any

from ._truncated_formatter_base import TruncatedFormatterBase
from .._logging import logger
from .._utils._common import _is_accessible_local_file
from ..message import (
    Msg,
    TextBlock,
    ImageBlock,
    AudioBlock,
    VideoBlock,
    ToolUseBlock,
    ToolResultBlock,
    URLSource,
)
from ..token import TokenCounterBase


def _format_dashscope_media_block(
    block: ImageBlock | AudioBlock,
) -> dict[str, str]:
    """Format an image or audio block for DashScope API.

    Args:
        block (`ImageBlock` | `AudioBlock`):
            The image or audio block to format.

    Returns:
        `dict[str, str]`:
            A dictionary with "image" or "audio" key and the formatted URL or
            data URI as value.

    Raises:
        `NotImplementedError`:
            If the source type is not supported.
    """
    typ = block["type"]
    source = block["source"]
    if source["type"] == "url":
        url = source["url"]
        if _is_accessible_local_file(url):
            return {typ: "file://" + os.path.abspath(url)}
        else:
            # treat as web url
            return {typ: url}

    elif source["type"] == "base64":
        media_type = source["media_type"]
        base64_data = source["data"]
        return {
            typ: f"data:{media_type};base64,{base64_data}",
        }

    else:
        raise NotImplementedError(
            f"Unsupported source type '{source.get('type')}' "
            f"for {typ} block.",
        )


def _reformat_messages(
    messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
    """Reformat the content to be compatible with HuggingFaceTokenCounter.

     This function processes a list of messages and converts multi-part
     text content into single string content when all parts are plain text.
     This is necessary for compatibility with HuggingFaceTokenCounter which
     expects simple string content rather than structured content with
     multiple parts.

    Args:
        messages (list[dict[str, Any]]):
            A list of message dictionaries where each message may contain a
            "content" field. The content can be either:
            - A string (unchanged)
            - A list of content items, where each item is a dict that may
                contain "text", "type", and other fields

    Returns:
        list[dict[str, Any]]:
            A list of reformatted messages. For messages where all content
            items are plain text (have "text" field and either no "type"
            field or "type" == "text"), the content list is converted to a
            single newline-joined string. Other messages remain unchanged.

    Example:
        .. code-block:: python

            # Case 1: All text content - will be converted
            messages = [
                {
                    "role": "user",
                    "content": [
                        {"text": "Hello", "type": "text"},
                        {"text": "World", "type": "text"}
                    ]
                }
            ]
            result = _reformat_messages(messages)
            print(result[0]["content"])
            # Output: "Hello\nWorld"

            # Case 2: Mixed content - will remain unchanged
            messages = [
                {
                    "role": "user",
                    "content": [
                        {"text": "Hello", "type": "text"},
                        {"image_url": "...", "type": "image"}
                    ]
                }
            ]

            result = _reformat_messages(messages)  # remain unchanged
            print(type(result[0]["content"]))
            # Output: <class 'list'>

    """
    for message in messages:
        content = message.get("content", [])

        is_all_text = True
        texts = []
        for item in content:
            if not isinstance(item, dict) or "text" not in item:
                is_all_text = False
                break
            if "type" in item and item["type"] != "text":
                is_all_text = False
                break
            if item["text"]:
                texts.append(item["text"])

        if is_all_text and texts:
            message["content"] = "\n".join(texts)

    return messages


class DashScopeChatFormatter(TruncatedFormatterBase):
    """The DashScope formatter class for chatbot scenario, where only a user
    and an agent are involved. We use the `role` field to identify different
    entities in the conversation.
    """

    support_tools_api: bool = True
    """Whether support tools API"""

    support_multiagent: bool = False
    """Whether support multi-agent conversations"""

    support_vision: bool = True
    """Whether support vision data"""

    supported_blocks: list[type] = [
        TextBlock,
        ImageBlock,
        AudioBlock,
        VideoBlock,
        ToolUseBlock,
        ToolResultBlock,
    ]

    def __init__(
        self,
        promote_tool_result_images: bool = False,
        promote_tool_result_audios: bool = False,
        promote_tool_result_videos: bool = False,
        token_counter: TokenCounterBase | None = None,
        max_tokens: int | None = None,
    ) -> None:
        """Initialize the DashScope chat formatter.

        Args:
            promote_tool_result_images (`bool`, defaults to `False`):
                Whether to promote images from tool results to user messages.
                Most LLM APIs don't support images in tool result blocks, but
                do support them in user message blocks. When `True`, images are
                extracted and appended as a separate user message with
                explanatory text indicating their source.
            promote_tool_result_audios (`bool`, defaults to `False`):
                Whether to promote audios from tool results to user messages.
                Most LLM APIs don't support audios in tool result blocks, but
                do support them in user message blocks. When `True`, audios are
                extracted and appended as a separate user message with
                explanatory text indicating their source.
            promote_tool_result_videos (`bool`, defaults to `False`):
                Whether to promote videos from tool results to user messages.
                Most LLM APIs don't support videos in tool result blocks, but
                do support them in user message blocks. When `True`, videos are
                extracted and appended as a separate user message with
                explanatory text indicating their source.
            token_counter (`TokenCounterBase | None`, optional):
                A token counter instance used to count tokens in the messages.
                If not provided, the formatter will format the messages
                without considering token limits.
            max_tokens (`int | None`, optional):
                The maximum number of tokens allowed in the formatted
                messages. If not provided, the formatter will not truncate
                the messages.
        """
        super().__init__(token_counter, max_tokens)
        self.promote_tool_result_images = promote_tool_result_images
        self.promote_tool_result_audios = promote_tool_result_audios
        self.promote_tool_result_videos = promote_tool_result_videos

    async def _format(
        self,
        msgs: list[Msg],
    ) -> list[dict[str, Any]]:
        """Format message objects into DashScope API format.

        Args:
            msgs (`list[Msg]`):
                The list of message objects to format.

        Returns:
            `list[dict[str, Any]]`:
                The formatted messages as a list of dictionaries.
        """
        self.assert_list_of_msgs(msgs)

        formatted_msgs: list[dict] = []

        i = 0
        while i < len(msgs):
            msg = msgs[i]
            content_blocks: list[dict[str, Any]] = []
            tool_calls = []

            for block in msg.get_content_blocks():
                typ = block.get("type")

                if typ == "text":
                    content_blocks.append(
                        {
                            "text": block.get("text"),
                        },
                    )

                elif typ in ["image", "audio", "video"]:
                    content_blocks.append(
                        _format_dashscope_media_block(
                            block,  # type: ignore[arg-type]
                        ),
                    )

                elif typ == "tool_use":
                    tool_calls.append(
                        {
                            "id": block.get("id"),
                            "type": "function",
                            "function": {
                                "name": block.get("name"),
                                "arguments": json.dumps(
                                    block.get("input", {}),
                                    ensure_ascii=False,
                                ),
                            },
                        },
                    )

                elif typ == "tool_result":
                    (
                        textual_output,
                        multimodal_data,
                    ) = self.convert_tool_result_to_string(block["output"])

                    # First add the tool result message in DashScope API format
                    formatted_msgs.append(
                        {
                            "role": "tool",
                            "tool_call_id": block.get("id"),
                            "content": textual_output,
                            "name": block.get("name"),
                        },
                    )

                    # Then, handle the multimodal data if any
                    promoted_blocks: list = []
                    for url, multimodal_block in multimodal_data:
                        if (
                            multimodal_block["type"] == "image"
                            and self.promote_tool_result_images
                        ):
                            promoted_blocks.extend(
                                [
                                    TextBlock(
                                        type="text",
                                        text=f"\n- The image from '{url}': ",
                                    ),
                                    ImageBlock(
                                        type="image",
                                        source=URLSource(
                                            type="url",
                                            url=url,
                                        ),
                                    ),
                                ],
                            )
                        elif (
                            multimodal_block["type"] == "audio"
                            and self.promote_tool_result_audios
                        ):
                            promoted_blocks.extend(
                                [
                                    TextBlock(
                                        type="text",
                                        text=f"\n- The audio from '{url}': ",
                                    ),
                                    AudioBlock(
                                        type="audio",
                                        source=URLSource(
                                            type="url",
                                            url=url,
                                        ),
                                    ),
                                ],
                            )
                        elif (
                            multimodal_block["type"] == "video"
                            and self.promote_tool_result_videos
                        ):
                            promoted_blocks.extend(
                                [
                                    TextBlock(
                                        type="text",
                                        text=f"\n- The video from '{url}': ",
                                    ),
                                    VideoBlock(
                                        type="video",
                                        source=URLSource(
                                            type="url",
                                            url=url,
                                        ),
                                    ),
                                ],
                            )

                    if promoted_blocks:
                        # Insert promoted blocks as new user message(s)
                        promoted_blocks = [
                            TextBlock(
                                type="text",
                                text="<system-info>The following are "
                                f"the media contents from the tool "
                                f"result of '{block['name']}':",
                            ),
                            *promoted_blocks,
                            TextBlock(
                                type="text",
                                text="</system-info>",
                            ),
                        ]

                        msgs.insert(
                            i + 1,
                            Msg(
                                name="user",
                                content=promoted_blocks,
                                role="user",
                            ),
                        )

                else:
                    logger.warning(
                        "Unsupported block type %s in the message, skipped.",
                        typ,
                    )

            msg_dashscope = {
                "role": msg.role,
                "content": content_blocks or [{"text": None}],
            }

            if tool_calls:
                msg_dashscope["tool_calls"] = tool_calls

            if msg_dashscope["content"] != [
                {"text": None},
            ] or msg_dashscope.get(
                "tool_calls",
            ):
                formatted_msgs.append(msg_dashscope)

            # Move to next message
            i += 1

        return _reformat_messages(formatted_msgs)


class DashScopeMultiAgentFormatter(TruncatedFormatterBase):
    """DashScope formatter for multi-agent conversations, where more than
    a user and an agent are involved.

    .. note:: This formatter will combine previous messages (except tool
     calls/results) into a history section in the first system message with
     the conversation history prompt.

    .. note:: For tool calls/results, they will be presented as separate
     messages as required by the DashScope API. Therefore, the tool calls/
     results messages are expected to be placed at the end of the input
     messages.

    .. tip:: Telling the assistant's name in the system prompt is very
     important in multi-agent conversations. So that LLM can know who it
     is playing as.

    """

    support_tools_api: bool = True
    """Whether support tools API"""

    support_multiagent: bool = True
    """Whether support multi-agent conversations"""

    support_vision: bool = True
    """Whether support vision data"""

    supported_blocks: list[type] = [
        TextBlock,
        # Multimodal
        ImageBlock,
        AudioBlock,
        VideoBlock,
        # Tool use
        ToolUseBlock,
        ToolResultBlock,
    ]
    """The list of supported message blocks"""

    def __init__(
        self,
        conversation_history_prompt: str = (
            "# Conversation History\n"
            "The content between <history></history> tags contains "
            "your conversation history\n"
        ),
        promote_tool_result_images: bool = False,
        promote_tool_result_audios: bool = False,
        promote_tool_result_videos: bool = False,
        token_counter: TokenCounterBase | None = None,
        max_tokens: int | None = None,
    ) -> None:
        """Initialize the DashScope multi-agent formatter.

        Args:
            conversation_history_prompt (`str`):
                The prompt to use for the conversation history section.
            promote_tool_result_images (`bool`, defaults to `False`):
                Whether to promote images from tool results to user messages.
                Most LLM APIs don't support images in tool result blocks, but
                do support them in user message blocks. When `True`, images are
                extracted and appended as a separate user message with
                explanatory text indicating their source.
            promote_tool_result_audios (`bool`, defaults to `False`):
                Whether to promote audios from tool results to user messages.
                Most LLM APIs don't support audios in tool result blocks, but
                do support them in user message blocks. When `True`, audios are
                extracted and appended as a separate user message with
                explanatory text indicating their source.
            promote_tool_result_videos (`bool`, defaults to `False`):
                Whether to promote videos from tool results to user messages.
                Most LLM APIs don't support videos in tool result blocks, but
                do support them in user message blocks. When `True`, videos are
                extracted and appended as a separate user message with
                explanatory text indicating their source.
            token_counter (`TokenCounterBase | None`, optional):
                The token counter used for truncation.
            max_tokens (`int | None`, optional):
                The maximum number of tokens allowed in the formatted
                messages. If `None`, no truncation will be applied.
        """
        super().__init__(token_counter=token_counter, max_tokens=max_tokens)
        self.conversation_history_prompt = conversation_history_prompt
        self.promote_tool_result_images = promote_tool_result_images
        self.promote_tool_result_audios = promote_tool_result_audios
        self.promote_tool_result_videos = promote_tool_result_videos

    async def _format_tool_sequence(
        self,
        msgs: list[Msg],
    ) -> list[dict[str, Any]]:
        """Given a sequence of tool call/result messages, format them into
        the required format for the DashScope API.

        Args:
            msgs (`list[Msg]`):
                The list of messages containing tool calls/results to format.

        Returns:
            `list[dict[str, Any]]`:
                A list of dictionaries formatted for the DashScope API.
        """
        return await DashScopeChatFormatter(
            promote_tool_result_images=self.promote_tool_result_images,
            promote_tool_result_audios=self.promote_tool_result_audios,
            promote_tool_result_videos=self.promote_tool_result_videos,
        ).format(msgs)

    async def _format_agent_message(
        self,
        msgs: list[Msg],
        is_first: bool = True,
    ) -> list[dict[str, Any]]:
        """Given a sequence of messages without tool calls/results, format
        them into a user message with conversation history tags. For the
        first agent message, it will include the conversation history prompt.

        Args:
            msgs (`list[Msg]`):
                A list of Msg objects to be formatted.
            is_first (`bool`, defaults to `True`):
                Whether this is the first agent message in the conversation.
                If `True`, the conversation history prompt will be included.

        Returns:
            `list[dict[str, Any]]`:
                A list of dictionaries formatted for the DashScope API.
        """

        if is_first:
            conversation_history_prompt = self.conversation_history_prompt
        else:
            conversation_history_prompt = ""

        # Format into required DashScope format
        formatted_msgs: list[dict] = []

        # Collect the multimodal files
        conversation_blocks = []
        accumulated_text = []
        for msg in msgs:
            for block in msg.get_content_blocks():
                if block["type"] == "text":
                    accumulated_text.append(f"{msg.name}: {block['text']}")

                elif block["type"] in ["image", "audio", "video"]:
                    # Handle the accumulated text as a single block
                    if accumulated_text:
                        conversation_blocks.append(
                            {"text": "\n".join(accumulated_text)},
                        )
                        accumulated_text.clear()

                    if block["source"]["type"] == "url":
                        url = block["source"]["url"]
                        if _is_accessible_local_file(url):
                            conversation_blocks.append(
                                {
                                    block["type"]: "file://"
                                    + os.path.abspath(url),
                                },
                            )
                        else:
                            conversation_blocks.append({block["type"]: url})

                    elif block["source"]["type"] == "base64":
                        media_type = block["source"]["media_type"]
                        base64_data = block["source"]["data"]
                        conversation_blocks.append(
                            {
                                block[
                                    "type"
                                ]: f"data:{media_type};base64,{base64_data}",
                            },
                        )

                    else:
                        logger.warning(
                            "Unsupported block type %s in the message, "
                            "skipped.",
                            block["type"],
                        )

        if accumulated_text:
            conversation_blocks.append({"text": "\n".join(accumulated_text)})

        if conversation_blocks:
            if conversation_blocks[0].get("text"):
                conversation_blocks[0]["text"] = (
                    conversation_history_prompt
                    + "<history>\n"
                    + conversation_blocks[0]["text"]
                )

            else:
                conversation_blocks.insert(
                    0,
                    {
                        "text": conversation_history_prompt + "<history>\n",
                    },
                )

            if conversation_blocks[-1].get("text"):
                conversation_blocks[-1]["text"] += "\n</history>"

            else:
                conversation_blocks.append({"text": "</history>"})

            formatted_msgs.append(
                {
                    "role": "user",
                    "content": conversation_blocks,
                },
            )

        return _reformat_messages(formatted_msgs)

    async def _format_system_message(
        self,
        msg: Msg,
    ) -> dict[str, Any]:
        """Format system message for DashScope API."""
        return {
            "role": "system",
            "content": msg.get_text_content(),
        }
